package com.shujia.flink.kafka

import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object Demo3OnKafkaExactlyOnceWiterFile {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    /**
      * 开启checkpoint
      *
      */

    // 每 1000ms 开始一次 checkpoint// 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(20000)

    // 高级选项：

    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // Checkpoint 必须在一分钟内完成，否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    // 开启在 job 中止后仍然保留的 externalized checkpoints
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    // 允许在有更近 savepoint 时回退到 checkpoint
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)


    //设置状态后端
    env.setStateBackend(new RocksDBStateBackend("hdfs://master:9000/flink/checkpoints"))


    val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092")
    properties.setProperty("group.id", "asdasfasdas")
    properties.setProperty("flink.partition-discovery.interval-millis", "100")

    //创建kafkasource
    val kafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](
      "trans",
      new SimpleStringSchema(),
      properties)

    //读取最新的数据
    kafkaConsumer.setStartFromGroupOffsets()

    val kafkads: DataStream[String] = env.addSource(kafkaConsumer)

    kafkads.print()

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("hdfs://master:9000/flink/out"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
          .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
          .withMaxPartSize(1024 * 1024 * 1024)
          .build())
      .build()


    /**
      * 在checkpoint成功之后才会将数据溢写到磁盘
      * 所以数据会有延迟
      *
      */

    kafkads.addSink(sink)

    env.execute()


  }
}
